package com.econ.springboot.kafka.producer.job;

import com.alibaba.fastjson.JSON;
import com.econ.springboot.kafka.producer.domain.AnalogData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

/**
 * @author jwc
 * @date 2022/10/26
 */
@Slf4j
@Configuration
@EnableScheduling
@RequiredArgsConstructor
public class Kafka2MongoTask {
    private final KafkaTemplate kafkaTemplate;
    @Resource(name = "taskExecutor")
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Bean
    public NewTopic batchTopic() {
        return new NewTopic("test-jwc5", 6, (short) 1);
    }

    @Scheduled(cron = "0/2 * * * * ? ")
    void produceMessage(){
        long totalStart = System.currentTimeMillis();
        for(int i = 0;i < 3;i++) {
            Random random = new Random();
            List<AnalogData> list = new LinkedList<>();
            for (int j = 0; j < 8000; j++) {
                String pid = getRandomWords(3, 2) + "." + getRandomWords(3, 2);
                AnalogData s = new AnalogData().setId(pid + "." + System.currentTimeMillis() + random.nextInt(100)).setPid(pid).setM(100).setV("1.00");
                list.add(s);
            }
            kafkaTemplate.send("test-jwc5", JSON.toJSONString(list));
        }
        long totalEnd = System.currentTimeMillis();
        log.info("生产消息总用时：" + (totalEnd-totalStart));
    }

    /*@Async("taskExecutor")
    @Scheduled(cron = "0/10 * * * * ? ")
    void produceMessage() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        long totalStart = System.currentTimeMillis();
        for(int i = 0;i < 12;i++) {
            threadPoolTaskExecutor.execute(() -> {
                try {
                    Random random = new Random();
                    List<AnalogData> list = new LinkedList<>();
                    for (int j = 0; j < 2000; j++) {
                        String pid = getRandomWords(3, 2) + "." + getRandomWords(3, 2);
                        AnalogData s = new AnalogData().setId(pid + "." + System.currentTimeMillis() + random.nextInt(100)).setPid(pid).setM(100).setV("1.00");
                        list.add(s);
                    }
                    kafkaTemplate.send("test-jwc5",0,"", JSON.toJSONString(list));
                    System.out.println(Thread.currentThread().getName() + "向kafka生产analog_data");
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        System.out.println("等待子线程完成~~~");
        countDownLatch.await();
        long totalEnd = System.currentTimeMillis();
        System.out.println("总用时：" + (totalEnd-totalStart));
        System.out.println("主线程结束");
    }*/

    /**
     * 获取随机字母组合
     * begin 最小长度
     * offset 最小长度之后随机增加长度区间
     * @return
     */
    public static String getRandomWords(Integer begin, Integer offset) {
        //创建random 需要一个种子 同样的种子会出现固定顺序的random
        // 突发奇想用了时间戳
        Random random = new Random(System.currentTimeMillis());
        // 计算最终返回长度 这个方法是包左不包右的所以+1
        int i = random.nextInt(offset + 1) + begin;
        // 返回结果预存集合
        List<Character> results = new ArrayList<>();
        while(results.size() < i) {
            // 数组中取出一个随机索引 以及元素
            int index=(int)(Math.random()*words.length);
            results.add(words[index]);
        }
        // list 转character数组
        Character[] array = results.toArray(new Character[]{});
        // character 数组转char
        char[] chars = ArrayUtils.toPrimitive(array);
        // 返回结果
        return new String(chars);
    }

    // 这里是种子字母 当然如果需要可以加上数字 已经把大小写区分不明显的去掉了
    public static Character[] words = new Character[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l' ,'m', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
            'A', 'B', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'};

}
